package com.stars.distributed.schedule.core;

import com.stars.distributed.schedule.bean.DbScheduleServer;
import com.stars.distributed.schedule.enums.ThreadStatus;
import com.stars.distributed.schedule.factory.DefaultThreadFactory;
import com.stars.distributed.schedule.util.StringUtils;
import com.stars.distributed.schedule.util.SystemUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

/**
 * 分布式调度框架基础线程
 *
 * @author guoguifang
 */
abstract class AbstractDistributedScheduleThread implements Runnable {

    protected final Logger logger = LoggerFactory.getLogger(this.getClass());

    /**
     * 创建时间
     */
    private long createTime;

    /**
     * 启动时间
     */
    private long startTime;

    /**
     * 结束时间
     */
    private long stopTime;

    /**
     * 是否启动线程
     */
    private boolean isRunning;

    /**
     * 是否暂停
     */
    private boolean isPause;

    /**
     * 线程状态
     */
    private ThreadStatus threadStatus;

    /**
     * 当前线程状态开始时间
     */
    private long threadStatusStartTime;

    /**
     * 调度器线程池，同一时间最大允许一个线程
     */
    private ExecutorService executorService;

    /**
     * 线程并发锁
     */
    private final ReentrantLock runLock = new ReentrantLock();

    /**
     * 线程并发锁
     */
    private final ReentrantLock mainLock = new ReentrantLock();

    /**
     * 线程空闲等待锁
     */
    private final ReentrantLock idleLock = new ReentrantLock();

    /**
     * 线程空闲等待锁监控
     */
    private final Condition idleTermination = idleLock.newCondition();

    /**
     * 线程空闲等待锁监控状态
     */
    private final AtomicInteger idleState = new AtomicInteger(0);

    /**
     * 线程阻塞锁
     */
    private final ReentrantLock blockLock = new ReentrantLock();

    /**
     * 线程阻塞锁监控
     */
    private final Condition blockTermination = blockLock.newCondition();

    /**
     * 线程阻塞锁监控状态
     */
    private final AtomicInteger blockState = new AtomicInteger(0);

    protected AbstractDistributedScheduleThread() {
        this.createTime = System.currentTimeMillis();
        this.threadStatus = ThreadStatus.INIT;
        this.threadStatusStartTime = System.currentTimeMillis();
    }

    /**
     * 执行线程任务之前需要执行的操作
     */
    protected void executeBefore() {
    }

    /**
     * 执行线程任务
     */
    protected abstract void execute();

    /**
     * 线程运行方法
     */
    @Override
    public final void run() {
        this.runLock.lock();
        try {
            // 如果不可运行直接返回
            if (!isRunning) {
                return;
            }

            // 执行任务前的操作
            executeBefore();

            // 线程开始运行
            startThread();
            while (isRunning) {
                // 如果处于暂停状态则返回
                if (checkIsPause()) {
                    continue;
                }
                // 若是"待运行"及"暂停中"状态下需修改为"运行中"状态
                changeThreadStatus(ThreadStatus.RUNNING);

                try {
                    execute();
                } catch (Throwable t) {
                    logger.error("The distributed scheduling framework [" + getThreadName() + "] execute failure!", t);
                    sleep(100);
                }
            }
            // 任务执行完成
            stopThread();
        } finally {
            this.runLock.unlock();
        }
    }

    /**
     * 启动线程
     */
    final void start() {
        this.mainLock.lock();
        try {
            // 只有当线程处于初始化状态时可以启动
            if (this.threadStatus != ThreadStatus.INIT) {
                logger.info("The distributed scheduling framework [{}] has been started successfully!", getThreadName());
                return;
            }
            // 修改线程可运行状态为true，并把线程状态修改为"待运行"状态，并把线程启动及关闭时间清零
            this.isRunning = true;
            this.changeThreadStatus(ThreadStatus.RUNABLE);
            this.startTime = 0L;
            this.stopTime = 0L;
            putThreadPool();
            // 记录线程启动日志
            logger.info("The distributed scheduling framework [{}] start success!", getThreadName());
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 暂停线程
     */
    final void suspend() {
        this.mainLock.lock();
        try {
            // 当线程处于初始化、正在尝试暂停...、已暂停状态时不可暂停
            if (this.threadStatus == ThreadStatus.INIT) {
                logger.warn("The distributed scheduling framework [{}] does not start successfully, can't suspend!", getThreadName());
                return;
            }
            if (this.threadStatus == ThreadStatus.PAUSING) {
                logger.info("The distributed scheduling framework [{}] is already trying to suspend!", getThreadName());
                return;
            }
            if (this.threadStatus == ThreadStatus.PAUSED) {
                logger.info("The distributed scheduling framework [{}] has been suspended!", getThreadName());
                return;
            }
            // 修改线程是否暂停状态为true，并把线程状态修改为"正在尝试暂停..."状态(线程暂停只能暂停下次调度而不能暂停本次调度，所以有"正在尝试暂停..."状态)
            this.isPause = true;
            this.changeThreadStatus(ThreadStatus.PAUSING);
            // 记录线程暂停日志
            logger.info("The distributed scheduling framework [{}] is trying to suspend!", getThreadName());
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 恢复暂停线程
     */
    final void resume() {
        this.mainLock.lock();
        try {
            // 当线程处于正在尝试暂停...、已暂停状态时才可以恢复
            if (this.threadStatus == ThreadStatus.INIT) {
                logger.warn("The distributed scheduling framework [{}] does not start successfully, can't suspend!", getThreadName());
                return;
            }
            if (this.threadStatus != ThreadStatus.PAUSING && this.threadStatus != ThreadStatus.PAUSED) {
                logger.info("The distributed scheduling framework [{}] status is {}, can't resume!", getThreadName(), this.threadStatus.getEnglishName());
                return;
            }
            // 修改线程是否暂停状态为true，并把线程状态修改为"暂停"状态
            this.isPause = false;
            synchronized (this) {
                try {
                    this.notify();
                } catch (Exception e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("The distributed scheduling framework [{}] notify failure!", getThreadName());
                    }
                }
            }
            this.changeThreadStatus(ThreadStatus.RUNABLE);
            // 记录线程恢复日志
            logger.info("The distributed scheduling framework [{}] resume success!", getThreadName());
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 停止线程
     */
    final void stop() {
        this.mainLock.lock();
        try {
            // 当线程处于初始化状态时不可停止
            if (this.threadStatus == ThreadStatus.INIT) {
                logger.warn("The distributed scheduling framework [{}] does not start successfully, can't stop!", getThreadName());
                return;
            }
            // 修改线程是否可运行及是否暂停状态为false，并把线程状态修改为"初始化"状态
            this.isRunning = false;
            this.isPause = false;
            this.changeThreadStatus(ThreadStatus.INIT);
            // 记录线程关闭日志
            logger.info("The distributed scheduling framework [{}] stop success!", getThreadName());
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 得到当前线程名称
     */
    public String getThreadName() {
        return StringUtils.join(new Object[]{SystemUtils.getHostIp(), SystemUtils.getPort(), this.getClass().getSimpleName()}, ":");
    }

    /**
     * 得到当前线程是否运行
     */
    public boolean isRunning() {
        return this.isRunning;
    }

    /**
     * 得到当前线程是否暂停
     */
    public boolean isPause() {
        return this.isPause;
    }

    /**
     * 得到当前线程创建时间
     */
    public long getThreadCreateTime() {
        return createTime;
    }

    /**
     * 得到当前线程状态
     */
    public ThreadStatus getThreadStatus() {
        return threadStatus;
    }

    /**
     * 得到当前线程状态开始时间
     */
    public long getThreadStatusHoldTime() {
        return System.currentTimeMillis() - threadStatusStartTime;
    }

    /**
     * 当前线程运行时长
     */
    public long getThreadRunningTime() {
        if (startTime == 0L) {
            return 0L;
        }
        if (stopTime == 0L) {
            return System.currentTimeMillis() - startTime;
        }
        return stopTime - startTime;
    }

    /**
     * 当前线程启动
     */
    protected void startThread() {
        this.startTime = System.currentTimeMillis();
    }

    /**
     * 当前线程关闭
     */
    protected void stopThread() {
        this.stopTime = System.currentTimeMillis();
    }

    /**
     * 改变当前线程状态
     */
    protected void changeThreadStatus(ThreadStatus threadStatus) {
        this.mainLock.lock();
        try {
            if (this.threadStatus != threadStatus) {
                this.threadStatus = threadStatus;
                this.threadStatusStartTime = System.currentTimeMillis();
            }
        } finally {
            this.mainLock.unlock();
        }
    }

    /**
     * 检查是否处于暂停状态，每次默认等待30秒，防止notify失败
     */
    protected boolean checkIsPause() {
        if (isPause) {
            synchronized (this) {
                try {
                    changeThreadStatus(ThreadStatus.PAUSED);
                    this.wait(30000L);
                } catch (InterruptedException e) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("The distributed scheduling framework [{}] wait interrupted!", getThreadName());
                    }
                }
            }
        }
        return isPause;
    }

    /**
     * 线程睡眠
     */
    protected void sleep(long sleepTime) {
        if (sleepTime > 0) {
            try {
                changeThreadStatus(ThreadStatus.BLOCKED);
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("The distributed scheduling framework [{}] sleep interrupted!", getThreadName());
                }
            }
        }
    }

    /**
     * 线程空闲等待
     */
    protected void awaitIdleMillis(long idleTimeMillis) {
        if (this.idleState.get() >= 0) {
            this.idleLock.lock();
            try {
                if (idleTimeMillis > 0) {
                    changeThreadStatus(ThreadStatus.IDLE);
                    this.idleState.getAndIncrement();
                    this.idleTermination.awaitNanos(TimeUnit.MILLISECONDS.toNanos(idleTimeMillis));
                }
            } catch (InterruptedException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("The distributed scheduling framework [{}] wait interrupted!", getThreadName());
                }
            } finally {
                this.idleLock.unlock();
            }
        }
        this.idleState.set(0);
    }

    /**
     * 解除线程等待
     */
    public void signalIdle() {
        this.idleLock.lock();
        try {
            this.idleTermination.signalAll();
            if (this.idleState.get() <= 0) {
                this.idleState.getAndDecrement();
            }
        } finally {
            this.idleLock.unlock();
        }
    }

    /**
     * 无可用线程阻塞
     */
    protected void awaitBlockMillis(long blockTimeMillis) {
        if (this.blockState.get() >= 0) {
            this.blockLock.lock();
            try {
                if (blockTimeMillis > 0) {
                    changeThreadStatus(ThreadStatus.BLOCKED);
                    this.blockState.getAndIncrement();
                    this.blockTermination.awaitNanos(TimeUnit.MILLISECONDS.toNanos(blockTimeMillis));
                }
            } catch (InterruptedException e) {
                if (logger.isDebugEnabled()) {
                    logger.debug("The distributed scheduling framework {} is interrupted in waiting!", getThreadName());
                }
            } finally {
                this.blockLock.unlock();
            }
        }
        this.blockState.set(0);
    }

    /**
     * 解除无可用线程阻塞
     */
    public void signalBlock() {
        this.blockLock.lock();
        try {
            this.blockTermination.signalAll();
            if (this.blockState.get() <= 0) {
                this.blockState.getAndDecrement();
            }
        } finally {
            this.blockLock.unlock();
        }
    }

    protected DbScheduleServer getDistributedScheduleServerConfig() {
        return DistributedScheduleManager.getSingleInstance().getDbScheduleServerConfig();
    }

    protected void putThreadPool() {
        if (executorService == null) {
            ThreadFactory namedThreadFactory = new DefaultThreadFactory().setNameFormat(getThreadName()).build();
            executorService = new ThreadPoolExecutor(0, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1), namedThreadFactory, new ThreadPoolExecutor.DiscardPolicy());
        }
        executorService.execute(this);
    }
}
